home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import bisect
- import gobject
- import orca_state
- import pyatspi
- import speech
- import copy
- import time
- from orca_i18n import _
- LIVE_OFF = -1
- LIVE_NONE = 0
- LIVE_POLITE = 1
- LIVE_ASSERTIVE = 2
- LIVE_RUDE = 3
- MSG_KEEPALIVE_TIME = 45
- CACHE_SIZE = 9
-
- class PriorityQueue:
- ''' This class represents a thread **UNSAFE** priority queue where priority
- is determined by the given integer priority. The entries are also
- maintained in chronological order.
-
- TODO: experiment with Queue.Queue to make thread safe
- '''
-
- def __init__(self):
- self.queue = []
-
-
- def enqueue(self, data, priority, obj):
- ''' Add a new element to the queue according to 1) priority and
- 2) timestamp. '''
- bisect.insort_left(self.queue, (priority, time.time(), data, obj))
-
-
- def dequeue(self):
- '''get the highest priority element from the queue. '''
- return self.queue.pop(0)
-
-
- def clear(self):
- ''' Clear the queue '''
- self.queue = []
-
-
- def purgeByKeepAlive(self):
- ''' Purge items from the queue that are older than the keepalive
- time '''
- currenttime = time.time()
-
- myfilter = lambda item: item[1] + MSG_KEEPALIVE_TIME > currenttime
- self.queue = filter(myfilter, self.queue)
-
-
- def purgeByPriority(self, priority):
- ''' Purge items from the queue that have a lower than or equal priority
- than the given argument '''
-
- myfilter = lambda item: item[0] > priority
- self.queue = filter(myfilter, self.queue)
-
-
- def clumpContents(self):
- """ Combines messages with the same 'label' by appending newer
- 'content' and removing the newer message. This operation is only
- applied to the next dequeued message for performance reasons and is
- often applied in conjunction with filterContents() """
- if len(self.queue):
- newqueue = []
- newqueue.append(self.queue[0])
- targetlabels = newqueue[0][2]['labels']
- targetcontent = newqueue[0][2]['content']
- for i in range(1, len(self.queue)):
- if self.queue[i][2]['labels'] == targetlabels:
- newqueue[0][2]['content'].extend(self.queue[i][2]['content'])
- continue
- newqueue.append(self.queue[i])
-
- self.queue = newqueue
-
-
-
- def filterContents(self):
- ''' Combines utterances by eliminating repeated utterances and
- utterances that are part of other utterances. '''
- if len(self.queue[0][2]['content']) > 1:
- oldcontent = self.queue[0][2]['content']
- newcontent = [
- oldcontent[0]]
- for i in range(1, len(oldcontent)):
- found = False
- for j in range(len(newcontent)):
- if oldcontent[i].find(newcontent[j]) != -1 or newcontent[j].find(oldcontent[i]) != -1:
- if len(oldcontent[i]) > len(newcontent[j]):
- newcontent[j] = oldcontent[i]
-
- found = True
- break
- continue
-
- if not found:
- newcontent.append(oldcontent[i])
- continue
-
- self.queue[0][2]['content'] = newcontent
-
-
-
- def __len__(self):
- ''' Return the length of the queue '''
- return len(self.queue)
-
-
-
- class LiveRegionManager:
-
- def __init__(self, script):
- self._script = script
- self.msg_queue = PriorityQueue()
- self.msg_cache = []
- self._politenessOverrides = None
- self._restoreOverrides = None
- self.lastliveobj = None
- self.monitoring = True
- self.bookmarkLoadHandler()
- script.bookmarks.addSaveObserver(self.bookmarkSaveHandler)
- script.bookmarks.addLoadObserver(self.bookmarkLoadHandler)
-
-
- def reset(self):
- newpoliteness = { }
- currenturi = self._script.bookmarks.getURIKey()
- for key, value in self._politenessOverrides.iteritems():
- if key[0] == currenturi or value != LIVE_NONE:
- newpoliteness[key] = value
- continue
-
- self._politenessOverrides = newpoliteness
-
-
- def bookmarkSaveHandler(self):
- '''Bookmark save callback'''
- self._script.bookmarks.saveBookmarksToDisk(self._politenessOverrides, filename = 'politeness')
-
-
- def bookmarkLoadHandler(self):
- '''Bookmark load callback'''
- if not self._script.bookmarks.readBookmarksFromDisk(filename = 'politeness'):
- pass
- self._politenessOverrides = { }
-
-
- def handleEvent(self, event):
- '''Main live region event handler'''
- politeness = self._getLiveType(event.source)
- if politeness == LIVE_OFF:
- return None
- if politeness == LIVE_NONE:
- if not self.monitoring:
- return None
- elif politeness == LIVE_POLITE:
- pass
- elif politeness == LIVE_ASSERTIVE:
- self.msg_queue.purgeByPriority(LIVE_POLITE)
- elif politeness == LIVE_RUDE:
- self.msg_queue.purgeByPriority(LIVE_ASSERTIVE)
-
- message = self._getMessage(event)
- if message:
- if len(self.msg_queue) == 0:
- gobject.timeout_add(100, self.pumpMessages)
-
- self.msg_queue.enqueue(message, politeness, event.source)
-
-
-
- def pumpMessages(self):
- ''' Main gobject callback for live region support. Handles both
- purging the message queue and outputting any queued messages that
- were queued up in the handleEvent() method.
- '''
- if len(self.msg_queue) > 0 and not speech.isSpeaking() and time.time() - orca_state.lastInputEvent.time > 1:
- self.msg_queue.purgeByKeepAlive()
- self.msg_queue.clumpContents()
- self.msg_queue.filterContents()
- (politeness, timestamp, message, obj) = self.msg_queue.dequeue()
- if message['labels'] == message['content']:
- utts = message['content']
- else:
- utts = message['labels'] + message['content']
- speech.speakUtterances(utts)
- self.lastliveobj = obj
- self._cacheMessage(utts)
-
- if not self.monitoring:
- self.msg_queue.purgeByKeepAlive()
-
- if len(self.msg_queue) > 0:
- return True
- return False
-
-
- def getLiveNoneObjects(self):
- '''Return the live objects that are registered and have a politeness
- of LIVE_NONE. '''
- retval = []
- currenturi = self._script.bookmarks.getURIKey()
- for uri, objectid in self._politenessOverrides:
- if uri == currenturi and isinstance(objectid, tuple):
- retval.append(self._script.bookmarks.pathToObj(objectid))
- continue
-
- return retval
-
-
- def advancePoliteness(self, obj):
- '''Advance the politeness level of the given object'''
- utterances = []
- objectid = self._getObjectId(obj)
- uri = self._script.bookmarks.getURIKey()
-
- try:
- cur_priority = self._politenessOverrides[(uri, objectid)]
- except KeyError:
- cur_priority = self._liveStringToType(obj)
-
- if cur_priority == LIVE_OFF or cur_priority == LIVE_NONE:
- self._politenessOverrides[(uri, objectid)] = LIVE_POLITE
- utterances.append(_('setting live region to polite'))
- elif cur_priority == LIVE_POLITE:
- self._politenessOverrides[(uri, objectid)] = LIVE_ASSERTIVE
- utterances.append(_('setting live region to assertive'))
- elif cur_priority == LIVE_ASSERTIVE:
- self._politenessOverrides[(uri, objectid)] = LIVE_RUDE
- utterances.append(_('setting live region to rude'))
- elif cur_priority == LIVE_RUDE:
- self._politenessOverrides[(uri, objectid)] = LIVE_OFF
- utterances.append(_('setting live region to off'))
-
- speech.speakUtterances(utterances)
-
-
- def goLastLiveRegion(self):
- '''Move the caret to the last announced live region and speak the
- contents of that object'''
- if self.lastliveobj:
- self._script.setCaretPosition(self.lastliveobj, 0)
- self._script.outlineAccessible(self.lastliveobj)
- self._script.speakContents(self._script.getObjectContentsAtOffset(self.lastliveobj, 0))
-
-
-
- def reviewLiveAnnouncement(self, msgnum):
- '''Speak the given number cached message'''
- if msgnum > len(self.msg_cache):
- speech.speak(_('no live message saved'))
- else:
- speech.speakUtterances(self.msg_cache[-msgnum])
-
-
- def setLivePolitenessOff(self):
- '''User toggle to set all live regions to LIVE_OFF or back to their
- original politeness.'''
- docframe = self._script.getDocumentFrame()
- uri = self._script.bookmarks.getURIKey()
- if self.monitoring:
- speech.speak(_('All live regions set to off'))
- self.msg_queue.clear()
- self._restoreOverrides = copy.copy(self._politenessOverrides)
- for override in self._politenessOverrides.keys():
- self._politenessOverrides[override] = LIVE_OFF
-
- matches = pyatspi.findAllDescendants(docframe, self.matchLiveRegion)
- for match in matches:
- objectid = self._getObjectId(match)
- self._politenessOverrides[(uri, objectid)] = LIVE_OFF
-
- self.monitoring = False
- else:
- for key, value in self._restoreOverrides.iteritems():
- self._politenessOverrides[key] = value
-
- speech.speak(_('live regions politeness levels restored'))
- self.monitoring = True
-
-
- def outputLiveRegionDescription(self, obj):
- '''Used in conjuction with whereAmI to output description and
- politeness of the given live region object'''
- objectid = self._getObjectId(obj)
- uri = self._script.bookmarks.getURIKey()
- utterances = []
- for relation in obj.getRelationSet():
- relationtype = relation.getRelationType()
- if relationtype == pyatspi.RELATION_DESCRIBED_BY:
- targetobj = relation.getTarget(0)
-
- try:
- description = targetobj.queryText().getText(0, -1)
- if description.strip() != obj.description.strip():
- utterances.append(description)
- except NotImplemented:
- pass
- except:
- None<EXCEPTION MATCH>NotImplemented
-
-
- None<EXCEPTION MATCH>NotImplemented
-
-
- try:
- livepriority = self._politenessOverrides[(uri, objectid)]
- liveprioritystr = self._liveTypeToString(livepriority)
- except KeyError:
- liveprioritystr = 'none'
-
- if utterances or liveprioritystr != 'none':
- utterances.append(_('politeness level %s') % liveprioritystr)
- speech.speakUtterances(utterances)
-
-
-
- def matchLiveRegion(self, obj):
- '''Predicate used to find a live region'''
- attrs = self._getAttrDictionary(obj)
- return 'container-live' in attrs
-
-
- def _getMessage(self, event):
- '''Gets the message associated with a given live event.'''
- attrs = self._getAttrDictionary(event.source)
- content = []
- labels = []
- if event.type.startswith('object:children-changed:add'):
-
- try:
- if attrs['container-atomic'] == 'true':
- newcontent = self._script.expandEOCs(event.source)
- else:
- newcontent = self._script.expandEOCs(event.any_data)
- except (KeyError, TypeError):
- newcontent = self._script.expandEOCs(event.any_data)
-
- if newcontent:
- content.append(newcontent)
- else:
- return None
- newcontent
-
- try:
- sourceitext = event.source.queryText()
- except NotImplementedError:
- return None
-
- txt = sourceitext.getText(0, -1)
- if txt.count(self._script.EMBEDDED_OBJECT_CHARACTER) > 0:
- return None
-
- try:
- if attrs['container-atomic'] == 'true':
- newcontent = txt
- else:
- newcontent = txt[event.detail1:event.detail1 + event.detail2]
- except KeyError:
- txt.count(self._script.EMBEDDED_OBJECT_CHARACTER) > 0
- txt.count(self._script.EMBEDDED_OBJECT_CHARACTER) > 0
- newcontent = txt[event.detail1:event.detail1 + event.detail2]
- except:
- txt.count(self._script.EMBEDDED_OBJECT_CHARACTER) > 0
-
- if len(newcontent) > 0:
- content.append(newcontent)
- else:
- return None
- labels = (txt.count(self._script.EMBEDDED_OBJECT_CHARACTER) > 0)._getLabelsAsUtterances(event.source)
- if 'channel' in attrs and attrs['channel'] == 'notify':
- utts = labels + content
- speech.stop()
- speech.speakUtterances(utts)
- return None
- return {
- 'content': content,
- 'labels': labels }
-
-
- def flushMessages(self):
- self.msg_queue.clear()
-
-
- def _cacheMessage(self, utts):
- '''Cache a message in our cache list of length CACHE_SIZE'''
- self.msg_cache.append(utts)
- if len(self.msg_cache) > CACHE_SIZE:
- self.msg_cache.pop(0)
-
-
-
- def _getLabelsAsUtterances(self, obj):
- '''Get the labels for a given object'''
- uttstring = self._script.getDisplayedLabel(obj)
- if uttstring:
- return [
- uttstring.strip()]
- return []
-
-
- def _getLiveType(self, obj):
- '''Returns the live politeness setting for a given object. Also,
- registers LIVE_NONE objects in politeness overrides when monitoring.'''
- objectid = self._getObjectId(obj)
- uri = self._script.bookmarks.getURIKey()
- if (uri, objectid) in self._politenessOverrides:
- return self._politenessOverrides[(uri, objectid)]
- livetype = self._liveStringToType(obj)
- if livetype == LIVE_NONE and self.monitoring:
- self._politenessOverrides[(uri, objectid)] = livetype
-
- return livetype
-
-
- def _getObjectId(self, obj):
- """Returns the HTML 'id' or a path to the object is an HTML id is
- unavailable"""
- attrs = self._getAttrDictionary(obj)
- if attrs is None:
- return self._getPath(obj)
-
- try:
- return attrs['id']
- except KeyError:
- attrs is None
- attrs is None
- return self._getPath(obj)
-
-
-
- def _liveStringToType(self, obj, attributes = None):
- '''Returns the politeness enum for a given object'''
- if not attributes:
- pass
- attrs = self._getAttrDictionary(obj)
-
- try:
- if attrs['container-live'] == 'off':
- return LIVE_OFF
- if attrs['container-live'] == 'polite':
- return LIVE_POLITE
- if attrs['container-live'] == 'assertive':
- return LIVE_ASSERTIVE
- if attrs['container-live'] == 'rude':
- return LIVE_RUDE
- return LIVE_NONE
- except KeyError:
- return LIVE_NONE
-
-
-
- def _liveTypeToString(self, politeness):
- '''Returns the politeness level as a string given a politeness enum'''
- if politeness == LIVE_OFF:
- return 'off'
- if politeness == LIVE_POLITE:
- return 'polite'
- if politeness == LIVE_ASSERTIVE:
- return 'assertive'
- if politeness == LIVE_RUDE:
- return 'rude'
- if politeness == LIVE_NONE:
- return 'none'
- return 'unknown'
-
-
- def _getAttrDictionary(self, obj):
-
- try:
- return []([ attr.split(':', 1) for attr in obj.getAttributes() ])
- except:
- return { }
-
-
-
- def _getPath(self, obj):
- ''' Returns, as a tuple of integers, the path from the given object
- to the document frame.'''
- docframe = self._script.getDocumentFrame()
- path = []
- while obj.parent is None or obj == docframe:
- path.reverse()
- return tuple(path)
-
- try:
- path.append(obj.getIndexInParent())
- except Exception:
- raise LookupError
-
- obj = obj.parent
- continue
- return None
-
-
-